Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add broadcast functionality from triggers #1156

Merged
merged 12 commits into from
Oct 15, 2024

Conversation

filipecabaco
Copy link
Contributor

@filipecabaco filipecabaco commented Sep 17, 2024

What kind of change does this PR introduce?

Adds a new functionality to broadcast db changes. It adds:

  • Adds uuid column to realtime.messages so we can later change all ids from serial id to uuid format
  • Adds realtime.send postgres function to broadcast messages
  • Adds realtime.broadcast_changes postgres function to broadcast broadcast changes specifically and be used in trigger functions
  • Adds BroadcastChanges feature by tracking the WAL of realtime.messages

Example:

CREATE TABLE test_table (
    id serial PRIMARY KEY,
    name text NOT NULL,
    value integer NOT NULL,
    created_at timestamp DEFAULT CURRENT_TIMESTAMP
);

CREATE OR REPLACE FUNCTION broadcast_changes_for_test_table_trigger ()
    RETURNS TRIGGER
    AS $$
DECLARE
    topic text;
BEGIN
    topic = 'event:' || COALESCE(NEW.id::text, OLD.id::text);
    PERFORM
        realtime.broadcast_changes (topic, TG_OP, TG_OP, TG_TABLE_NAME, TG_TABLE_SCHEMA, NEW, OLD, TG_LEVEL);
    RETURN NULL;
END;
$$
LANGUAGE plpgsql;

CREATE TRIGGER broadcast_changes_for_test_table
    AFTER INSERT OR UPDATE OR DELETE ON test_table
    FOR EACH ROW
    EXECUTE FUNCTION broadcast_changes_for_test_table_trigger ();

-- Insert operation
INSERT INTO test_table (name, value) VALUES ('example_name', 42);

-- Update operation
UPDATE test_table SET value = 100 WHERE name = 'example_name';

-- Delete operation
DELETE FROM test_table WHERE name = 'example_name';

-- Send a message to the topic
SELECT realtime.send (to_jsonb ('{}'::text), 'test', 'test', FALSE);
   topic  | extension |     inserted_at     |     updated_at      |                                                                                                                                       payload                                                                                                                                       | event  | private |                  id                  
---------+-----------+---------------------+---------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+---------+--------------------------------------
 event:1 | broadcast | 2024-09-19 15:55:51 | 2024-09-19 15:55:51 | {"table": "test_table", "record": {"id": 1, "name": "example_name", "value": 42, "created_at": "2024-09-19T15:55:50.539754"}, "schema": "public", "operation": "INSERT", "old_record": null}                                                                                        | INSERT | t       | 39d6a1d8-04a4-4099-a2a3-e3c5a9b4605b
 event:1 | broadcast | 2024-09-19 15:55:53 | 2024-09-19 15:55:53 | {"table": "test_table", "record": {"id": 1, "name": "example_name", "value": 100, "created_at": "2024-09-19T15:55:50.539754"}, "schema": "public", "operation": "UPDATE", "old_record": {"id": 1, "name": "example_name", "value": 42, "created_at": "2024-09-19T15:55:50.539754"}} | UPDATE | t       | a34a5f73-13d2-4d11-b4ca-aaf041f8ad7a
 event:1 | broadcast | 2024-09-19 15:55:56 | 2024-09-19 15:55:56 | {"table": "test_table", "record": null, "schema": "public", "operation": "DELETE", "old_record": {"id": 1, "name": "example_name", "value": 100, "created_at": "2024-09-19T15:55:50.539754"}}                                                                                       | DELETE | t       | 6f5a291c-37b4-4051-96f4-98accdc5b201
 test    | broadcast | 2024-09-19 15:55:58 | 2024-09-19 15:55:58 | "{}"                                                                                                                                                                                                                                                                                | test   | f       | 6cc07f9e-4a59-42dd-9aa1-2d1b15e7827d

Copy link

vercel bot commented Sep 17, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Skipped Deployment
Name Status Preview Comments Updated (UTC)
realtime-demo ⬜️ Ignored (Inspect) Visit Preview Oct 15, 2024 4:18pm

@filipecabaco filipecabaco force-pushed the feat/broadcast-messages-changes branch 3 times, most recently from b346ad0 to b27bf56 Compare September 17, 2024 20:16
@chasers
Copy link
Contributor

chasers commented Sep 17, 2024

Should our payload match what we have here in the Webhooks trigger?

@filipecabaco filipecabaco force-pushed the feat/broadcast-messages-changes branch 2 times, most recently from 9ab5171 to 0e27113 Compare September 17, 2024 21:15
Copy link

@olirice olirice left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wouldn't hurt to have a check like

  IF TG_LEVEL = 'STATEMENT' THEN
    RAISE EXCEPTION 'realtime.broadcast_changes should be triggered for each row, not for each statement';
  END IF;

to make sure no one tries to "optimize" their trigger for bulk operations

@filipecabaco filipecabaco force-pushed the feat/broadcast-messages-changes branch 11 times, most recently from ad4ac2b to 122e4a4 Compare October 2, 2024 11:55
@filipecabaco filipecabaco force-pushed the feat/broadcast-messages-changes branch from 122e4a4 to 7dc2e04 Compare October 2, 2024 17:21
@filipecabaco filipecabaco force-pushed the feat/broadcast-messages-changes branch from 7bc88ec to 63aa048 Compare October 3, 2024 14:06
@filipecabaco filipecabaco force-pushed the feat/broadcast-messages-changes branch from 63aa048 to 54860fd Compare October 3, 2024 16:01
@josevalim
Copy link

Hi @filipecabaco! This looks exciting!

I have two questions:

Today you start a single replication process, but as far as I understand, it has a single handler, and the handler only looks into a single key in the registry. In this case, what value is the registry buying? Is it in case someone reconnects, you get will get a new entry in the registry and dispatch to that? And what happens if multiple clients are trying to subscribe to the same replication? Also, should the replication connection eventually terminate if no one is connected?

@filipecabaco
Copy link
Contributor Author

filipecabaco commented Oct 3, 2024

Hi @josevalim and thank you!

We actually have multiple handlers, one per tenant and the same happens for PosgtresReplication modules. Meaning that we need to register one process of Handler and one process of PostgresReplication per tenant on connect.

Assumptions:

  • We have multiple tenants and each one of them has their own connection details
  • Tenants are identified with a tenant_id

So the flow is as follows:

  • First client joins which means that we test the connection and keep the connection against the Tenant Database for future usage in the Realtime.Tenants.Connect
  • On success, we start their BroadcastChanges.Handler which is registered with the name {:via, Registry, {__MODULE__.Registry, tenant_id}}
  • BroadcastChanges.Handler starts PostgresReplication against realtime.messages which is registered with the name {:via, Registry, {__MODULE__.Registry, tenant_id}}
  • First client connection is upgraded to be a WS (uses Phoenix.Channels)
  • Second client connects, we already have a DB connection via Connect; a message Handler and a PostgresReplication process associated with this tenant_id so nothing is created
  • Second client connection gets upgraded a WS (uses Phoenix.Channels)

On the PostgresReplication / Handler:

  • On trigger, realtime.messages write to the WAL
  • PostgresReplication for tenant_id receives the event and sends it to the Handler of tenant_id
  • Handler receives messages and does the various steps to get it ready to be sent (get relation data (relation) => join relation data with payload content and sets a format (insert) => broadcast (commit)
  • Broadcast uses PubSub to broadcast to users that are connected

Termination:

  • Since all of this is linked to the Realtime.Tenants.Connect, that process will be killed after 5 minutes of no one connecting (already implemented) and everything will be brought down with it.

@josevalim
Copy link

We actually have multiple handlers, one per tenant and the same happens for PosgtresReplication modules. Meaning that we need to register one process of Handler and one process of PostgresReplication per tenant on connect.

Thank you, that makes complete sense! The existing connection is already responsible for setting up the concurrency and termination. With this in mind, I think you could simplify the current implementation?

For example, you might not need the registry for handlers: instead, a handler starts the replication and pass self() as a handler to the replication layer, and the replication layer sends messages to the handler process. This allows you to get rid of the behaviour and of the registry. Testing should still be straight-forward, because you can pass the test process to the replication. The only downside is that you cannot give a result back, but if you move the keep alive handling to the replication, then that should be fine anyway.

You probably don't need a registry for the postgrex replication either? Each connection knows the replication pid and the handler will be directly messaged.

WDYT?

There is even a question if you should have both a handler and a listener and if they could be the same process, but we can discuss this later. :)

@filipecabaco
Copy link
Contributor Author

Makes sense! Will work on removing the registries from both of them and will actually move the broadcast into PostgresReplication.

I did this heavily based on a quick lib I did to learn more about replication ( github.com/filipecabaco/postgres_replication ) and kept that format but for this specific scenario it's just adding more complexity without gains.

Will rework this today / tomorrow.

Thank you for the feedback 🙏

@filipecabaco
Copy link
Contributor Author

@josevalim did the change 👍 feels way simpler and less error prone. Still needed a Registry due to the fact that the tests were creating multiple entries and ended up clashing.

There might be some really heavy improvements to be done on the Realtime.Tenants.Connect module to simplify this.

@josevalim
Copy link

@josevalim did the change 👍 feels way simpler and less error prone. Still needed a Registry due to the fact that the tests were creating multiple entries and ended up clashing.

Fantastic! Final question: how would they clash? Where they named before? If so, why did they have to be named?

@filipecabaco
Copy link
Contributor Author

They were named before yes.

When we do Realtime.Tenants.Connect.lookup_or_start_connection which either fetches the pid or starts the connection. For some reason in testing with the same pid it tries to create multiple instances of postgres replication ending up in an error of "replication slot already in use".

I do suspect that it's due to the way test/integration/rt_channel_test.exs are written where I'm constantly tearing down connections to have a "fresh start" 🤔

@josevalim
Copy link

The tenant should store in it its state if it has a replication connection or not, so you should not have dupes, right? Unless you want to access this information externally, from another process, and you don't want to ask the tenant for the PID every time.

@filipecabaco
Copy link
Contributor Author

I was saving the pid as the way to signal if there was a replication connection but might have fumbled something. will check the code again to see if i can get that name registration removed

buffer = Enum.reverse(buffer)
tenant = Cache.get_tenant_by_external_id(tenant_id)

case BatchBroadcast.broadcast(nil, tenant, %{messages: buffer}, true) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're building up records and then broadcasting on commit?

I think there could be a LOT of changes before the commit message.

Could oom Realtime, and also cause significant delays in clients receiving messages.

If a change hits the wal we should just send it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no strong opinion honestly. it would be less logic so I would be ok with that.

since this is in our side to control, all inserts into realtime.messages should be quite "sane" and probably won't hit any issues

lib/realtime/broadcast_changes/handler.ex Outdated Show resolved Hide resolved
lib/realtime/tenants/connect.ex Outdated Show resolved Hide resolved
lib/realtime/broadcast_changes/handler.ex Show resolved Hide resolved
lib/realtime/broadcast_changes/handler.ex Show resolved Hide resolved
lib/realtime/broadcast_changes/handler.ex Show resolved Hide resolved
@filipecabaco filipecabaco force-pushed the feat/broadcast-messages-changes branch from f4c2c19 to a7f3d6a Compare October 12, 2024 10:30
@filipecabaco filipecabaco force-pushed the feat/broadcast-messages-changes branch from a7f3d6a to cebe0c1 Compare October 14, 2024 11:53
@filipecabaco filipecabaco merged commit 2922658 into main Oct 15, 2024
4 checks passed
@filipecabaco filipecabaco deleted the feat/broadcast-messages-changes branch October 15, 2024 16:41
@kiwicopple
Copy link
Member

🎉 This PR is included in version 2.33.0 🎉

The release is available on GitHub release

Your semantic-release bot 📦🚀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants